/*
 * Copyright 2024 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 */
package com.netflix.maestro.engine.steps;

import com.netflix.maestro.annotations.VisibleForTesting;
import com.netflix.maestro.engine.concurrency.InstanceStepConcurrencyHandler;
import com.netflix.maestro.engine.dao.MaestroStepInstanceActionDao;
import com.netflix.maestro.engine.dao.MaestroStepInstanceDao;
import com.netflix.maestro.engine.dao.MaestroWorkflowInstanceDao;
import com.netflix.maestro.engine.db.ForeachIterationOverview;
import com.netflix.maestro.engine.db.StepAction;
import com.netflix.maestro.engine.execution.RunRequest;
import com.netflix.maestro.engine.execution.StepRuntimeSummary;
import com.netflix.maestro.engine.execution.WorkflowSummary;
import com.netflix.maestro.engine.handlers.WorkflowActionHandler;
import com.netflix.maestro.engine.properties.ForeachStepRuntimeProperties;
import com.netflix.maestro.engine.utils.StepHelper;
import com.netflix.maestro.exceptions.MaestroInternalError;
import com.netflix.maestro.exceptions.MaestroRetryableError;
import com.netflix.maestro.models.Actions;
import com.netflix.maestro.models.Constants;
import com.netflix.maestro.models.Defaults;
import com.netflix.maestro.models.artifact.Artifact;
import com.netflix.maestro.models.artifact.ForeachArtifact;
import com.netflix.maestro.models.definition.ForeachStep;
import com.netflix.maestro.models.definition.Step;
import com.netflix.maestro.models.definition.Tag;
import com.netflix.maestro.models.definition.Workflow;
import com.netflix.maestro.models.error.Details;
import com.netflix.maestro.models.initiator.UpstreamInitiator;
import com.netflix.maestro.models.instance.ForeachAction;
import com.netflix.maestro.models.instance.ForeachStepOverview;
import com.netflix.maestro.models.instance.RestartConfig;
import com.netflix.maestro.models.instance.RunPolicy;
import com.netflix.maestro.models.instance.WorkflowInstance;
import com.netflix.maestro.models.instance.WorkflowRollupOverview;
import com.netflix.maestro.models.parameter.MapParameter;
import com.netflix.maestro.models.parameter.ParamDefinition;
import com.netflix.maestro.models.parameter.Parameter;
import com.netflix.maestro.models.timeline.TimelineDetailsEvent;
import com.netflix.maestro.models.timeline.TimelineEvent;
import com.netflix.maestro.models.timeline.TimelineLogEvent;
import com.netflix.maestro.queue.MaestroQueueSystem;
import com.netflix.maestro.queue.models.MessageDto;
import com.netflix.maestro.utils.Checks;
import com.netflix.maestro.utils.HashHelper;
import com.netflix.maestro.utils.IdHelper;
import com.netflix.maestro.utils.ObjectHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
 * Foreach step runtime. It supports empty loop as well. Foreach step will manage the run strategy
 * of all workflow instances it generates by itself to reduce the load on run strategy manager.
 * Additionally, there is no run_properties for workflow instance generated by foreach step.
 *
 * <p>Note that foreach inline workflow cannot be restarted by users. But its run_id might not match
 * the upstream workflow with the corresponding foreach step if users restart the foreach step.
 *
 * <p>In foreach restart, cannot change the foreach parameters, so we ensure the iteration number is
 * the same. Otherwise, users should start a new run.
 */
@Slf4j
@AllArgsConstructor
public class ForeachStepRuntime implements StepRuntime {
  private static final String FOREACH_TAG_NAME = Constants.FOREACH_INLINE_WORKFLOW_PREFIX;
  private static final String LOOP_PARAMS_NAME = Constants.LOOP_PARAMS_NAME;
  private static final String INDEX_PARAM_NAME = Constants.INDEX_PARAM_NAME;
  private static final int FOREACH_ITERATION_LIMIT = Constants.FOREACH_ITERATION_LIMIT;

  private final WorkflowActionHandler actionHandler;
  private final MaestroWorkflowInstanceDao instanceDao;
  private final MaestroStepInstanceDao stepInstanceDao;
  private final MaestroStepInstanceActionDao actionDao;
  private final MaestroQueueSystem queueSystem;
  private final InstanceStepConcurrencyHandler instanceStepConcurrencyHandler;
  private final ForeachStepRuntimeProperties properties;

  /** Foreach start will initialize foreach artifact with info. */
  @Override
  public Result start(
      WorkflowSummary workflowSummary, Step step, StepRuntimeSummary runtimeSummary) {
    try {
      Artifact artifact = createArtifact(workflowSummary, runtimeSummary);
      return new Result(
          State.DONE,
          Collections.singletonMap(artifact.getType().key(), artifact),
          Collections.emptyList());
    } catch (MaestroRetryableError mre) {
      // retryable error, will retry by the parent logics
      throw mre;
    } catch (Exception e) {
      LOG.error(
          "Failed to start foreach workflow step runtime for {}{}",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          e);
      return new Result(
          State.FATAL_ERROR,
          Collections.emptyMap(),
          Collections.singletonList(
              TimelineDetailsEvent.from(
                  Details.create(
                      e, false, "Failed to start foreach workflow step runtime with an error"))));
    }
  }

  private ForeachArtifact createArtifact(
      WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
    int total = getLoopParamsTotalCount(runtimeSummary);
    Checks.checkTrue(
        total <= FOREACH_ITERATION_LIMIT,
        "Foreach iteration number %s is over the loop size limit %s for step {}{}",
        total,
        FOREACH_ITERATION_LIMIT,
        workflowSummary.getIdentity(),
        runtimeSummary.getIdentity());

    ForeachArtifact artifact = new ForeachArtifact();
    artifact.setForeachWorkflowId(generateForeachWorkflowId(workflowSummary, runtimeSummary));
    artifact.setForeachIdentity(workflowSummary.getIdentity() + runtimeSummary.getIdentity());
    artifact.setForeachRunId(1L);

    // inline run should match its upstream run.
    artifact.setRunId(workflowSummary.getWorkflowRunId());
    if (runtimeSummary.getRestartConfig() != null) {
      artifact.setRunPolicy(runtimeSummary.getRestartConfig().getRestartPolicy());
    } else {
      artifact.setRunPolicy(workflowSummary.getRunPolicy());
    }

    artifact.setNextLoopIndex(0);
    artifact.setTotalLoopCount(total);
    artifact.setForeachOverview(new ForeachStepOverview());

    if (!workflowSummary.isFreshRun() || !runtimeSummary.getStepRetry().isRetryable()) {
      artifact.setForeachRunId(
          instanceDao.getLargestForeachRunIdFromRuns(artifact.getForeachWorkflowId()) + 1L);

      if (artifact.getRunPolicy() == RunPolicy.RESTART_FROM_INCOMPLETE
          || artifact.getRunPolicy() == RunPolicy.RESTART_FROM_SPECIFIC) {
        ForeachArtifact prevArtifact =
            stepInstanceDao.getLatestForeachArtifact(
                workflowSummary.getWorkflowId(),
                workflowSummary.getWorkflowInstanceId(),
                runtimeSummary.getStepId());
        if (prevArtifact != null && prevArtifact.getForeachOverview() != null) {
          RestartConfig config =
              ObjectHelper.valueOrDefault(
                  runtimeSummary.getRestartConfig(), workflowSummary.getRestartConfig());
          long toRestart = // this is the iteration id specified during restart from specific path
              RunRequest.getNextNode(config)
                  .filter(
                      node -> Objects.equals(node.getWorkflowId(), artifact.getForeachWorkflowId()))
                  .map(RestartConfig.RestartNode::getInstanceId)
                  .orElse(0L);

          long prevMaxIterationId =
              artifact
                  .getForeachOverview()
                  .initiateAndGetByPrevMaxIterationId(prevArtifact.getForeachOverview(), toRestart);

          initializeForeachArtifactRollup(
              artifact.getForeachOverview(),
              prevArtifact.getForeachOverview(),
              artifact.getForeachWorkflowId());

          if (toRestart != 0) {
            // Set pending action for restart
            artifact.setPendingAction(
                ForeachAction.builder()
                    .instanceId(toRestart)
                    .instanceRunId(Constants.LATEST_ONE) // unknown and will use base run + 1
                    .restartConfig(config)
                    .action(Actions.StepInstanceAction.RESTART)
                    .user(SYSTEM_USER)
                    .createTime(System.currentTimeMillis())
                    .build());
          }

          if (prevArtifact.getAncestorIterationCount() != null) {
            artifact.setAncestorIterationCount(
                Math.max(prevMaxIterationId, prevArtifact.getAncestorIterationCount()));
          } else {
            artifact.setAncestorIterationCount(prevMaxIterationId);
          }
          artifact.setNextLoopIndex((int) artifact.getForeachOverview().getCheckpoint());
        }
      }
    }

    return artifact;
  }

  /**
   * Initialize rollup with "base rollup" to aggregate rollup across runs.
   *
   * @param foreachOverview foreach step overview for a current run
   * @param prevForeachOverview foreach step overview for the previous current run
   * @param foreachWorkflowId workflow id for workflow that holds the foreach step
   */
  @VisibleForTesting
  void initializeForeachArtifactRollup(
      ForeachStepOverview foreachOverview,
      ForeachStepOverview prevForeachOverview,
      String foreachWorkflowId) {
    Set<Long> iterationsToRunInNewRun =
        foreachOverview.getIterationsToRunFromDetails(prevForeachOverview);
    WorkflowRollupOverview aggregatedRollupsPrevRun =
        getAggregatedRollupFromIterations(foreachWorkflowId, iterationsToRunInNewRun);

    foreachOverview.initiateStepRollup(prevForeachOverview.getRollup(), aggregatedRollupsPrevRun);
  }

  /**
   * Get aggregated rollup for passed iterationIds in batches.
   *
   * @param foreachWorkflowId workflow id of the foreach step
   * @param iterationsIds Set of iterationIds for which to get rollups
   * @return aggregated rollups for the passed iteration ids
   */
  private WorkflowRollupOverview getAggregatedRollupFromIterations(
      String foreachWorkflowId, Set<Long> iterationsIds) {
    WorkflowRollupOverview aggregated = new WorkflowRollupOverview();

    if (ObjectHelper.isCollectionEmptyOrNull(iterationsIds)
        || ObjectHelper.isNullOrEmpty(foreachWorkflowId)) {
      return aggregated;
    }

    List<Long> iterationsList = new ArrayList<>(iterationsIds);

    for (int i = 0; i < iterationsList.size(); i += properties.getGetRollupBatchLimit()) {
      List<Long> batch =
          iterationsList.subList(
              i, Math.min(i + properties.getGetRollupBatchLimit(), iterationsList.size()));

      List<WorkflowRollupOverview> rollups =
          instanceDao.getBatchForeachLatestRunRollupForIterations(foreachWorkflowId, batch);

      rollups.forEach(aggregated::aggregate);
    }

    return aggregated;
  }

  private int getLoopParamsTotalCount(StepRuntimeSummary runtimeSummary) {
    MapParameter loopParams = runtimeSummary.getParams().get(LOOP_PARAMS_NAME).asMapParam();
    int total = 1;
    for (String name : loopParams.getParamNames()) {
      Parameter param = loopParams.getEvaluatedParam(name);
      switch (param.getType()) {
        case STRING_ARRAY:
          total = total * param.asStringArray().length;
          break;
        case LONG_ARRAY:
          total = total * param.asLongArray().length;
          break;
        case DOUBLE_ARRAY:
          total = total * param.asDoubleArray().length;
          break;
        case BOOLEAN_ARRAY:
          total = total * param.asBooleanArray().length;
          break;
        default:
          throw new MaestroInternalError(
              "Invalid loop param with name [%s] and type [%s] for foreach step [%s]",
              name, param.getType(), runtimeSummary.getIdentity());
      }
    }
    return total;
  }

  /** Use the md5 of workflow id, instance id, and step id as the generated inline workflow id. */
  private String generateForeachWorkflowId(
      WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
    long instanceId = workflowSummary.getWorkflowInstanceId();
    if (workflowSummary.getInitiator().getType().isInline()) {
      instanceId =
          ((UpstreamInitiator) workflowSummary.getInitiator()).getNonInlineParent().getInstanceId();
    }

    return String.format(
        "%s_%s_%s_%s",
        FOREACH_TAG_NAME,
        IdHelper.hashKey(workflowSummary.getInternalId()),
        IdHelper.rangeKey(instanceId),
        HashHelper.md5(
            runtimeSummary.getStepId(),
            String.valueOf(workflowSummary.getWorkflowInstanceId()),
            workflowSummary.getWorkflowId()));
  }

  /** foreach execute to launch and monitor foreach iterations. */
  @Override
  public Result execute(
      WorkflowSummary workflowSummary, Step step, StepRuntimeSummary runtimeSummary) {
    try {
      ForeachArtifact artifact =
          runtimeSummary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();

      Optional<TimelineEvent> saveActionTimeline =
          savePendingActionIfFeasible(runtimeSummary, artifact);
      if (saveActionTimeline.isPresent()) {
        return new Result(
            State.CONTINUE,
            Collections.singletonMap(artifact.getType().key(), artifact),
            Collections.singletonList(saveActionTimeline.get()));
      }

      final int total = artifact.getTotalLoopCount();
      int index = artifact.getNextLoopIndex();
      if (index < 0 || index > total) { // assert the index value
        throw new MaestroInternalError(
            "Invalid index value [%s] for %s%s",
            index, workflowSummary.getIdentity(), runtimeSummary.getIdentity());
      }
      refreshIterationOverview(artifact);

      ForeachStep foreachStep = (ForeachStep) step;
      // Take restart action and update foreach artifact including overview and rollup
      Optional<TimelineEvent> takeActionTimeline =
          takePendingActionIfFeasible(workflowSummary, foreachStep, runtimeSummary, artifact);
      if (takeActionTimeline.isPresent()) {
        return new Result(
            State.CONTINUE,
            Collections.singletonMap(artifact.getType().key(), artifact),
            Collections.singletonList(takeActionTimeline.get()));
      }

      final long nonTerminalCount =
          artifact
              .getForeachOverview()
              .getRunningStatsCount(
                  ObjectHelper.valueOrDefault(
                      foreachStep.getStrictOrdering(),
                      Defaults.DEFAULT_FOREACH_STRICT_ORDERING_ENABLED));
      final boolean failed =
          artifact.getForeachOverview().statusExistInIterations(WorkflowInstance.Status.FAILED);
      boolean done = false;
      TimelineEvent timelineEvent = null;
      if (failed) {
        switch (step.getFailureMode()) {
          case FAIL_IMMEDIATELY:
            index = total;
            try {
              done = artifact.getForeachOverview().getRunningStatsCount(false) == 0;
              if (!done) {
                // only terminate itself and maestro task will take care of instance termination
                actionDao.terminate(
                    workflowSummary,
                    step.getId(),
                    SYSTEM_USER,
                    Actions.StepInstanceAction.KILL,
                    "Foreach step with FAIL_IMMEDIATELY terminates itself due to a failed iteration");
                return new Result(
                    State.CONTINUE,
                    Collections.singletonMap(artifact.getType().key(), artifact),
                    Collections.singletonList(
                        TimelineLogEvent.warn(
                            "Terminating the foreach step with FAIL_IMMEDIATELY because of a failed iteration. "
                                + "It will terminate all running iterations as well.")));
              }
            } catch (Exception e) {
              timelineEvent =
                  TimelineLogEvent.warn(
                      "Failed to stop all running foreach iterations. Will retry it.");
            }
            break;
          case IGNORE_FAILURE:
            // will not continue the iterations but foreach step will be COMPLETED_WITH_ERROR.
          case FAIL_AFTER_RUNNING:
            index = -index; // keep the progress
            timelineEvent =
                TimelineLogEvent.warn(
                    "Foreach step will be failed after all running steps because of a step failure.");
            done = nonTerminalCount == 0L;
            break;
          default:
            throw new MaestroInternalError(
                "Invalid failure mode: %s for foreach step %s%s",
                step.getFailureMode(), workflowSummary.getIdentity(), runtimeSummary.getIdentity());
        }
      } else {
        done = nonTerminalCount == 0L && index == total;
      }

      State state;
      if (done) {
        state = deriveStepStateOnceDone(artifact);
      } else {
        final long launchLimit =
            index < 0
                ? 0
                : Math.min(
                    getConcurrency(workflowSummary, foreachStep, nonTerminalCount), total - index);
        if (launchLimit > 0) {
          AtomicInteger nextIndex = new AtomicInteger(index);
          timelineEvent =
              runForeachIterations(
                  workflowSummary,
                  foreachStep,
                  runtimeSummary,
                  artifact,
                  nextIndex,
                  (int) launchLimit);
          index = nextIndex.get();
        } else {
          LOG.debug(
              "In step runtime {}{}, no newly created foreach workflow instances at iteration [{}]",
              workflowSummary.getIdentity(),
              runtimeSummary.getIdentity(),
              Math.abs(index));
        }
        state = State.CONTINUE;
      }
      // update foreach artifact, might be lagging but will be consistent at the end
      artifact.setNextLoopIndex(Math.abs(index));

      return new Result(
          state,
          Collections.singletonMap(artifact.getType().key(), artifact),
          timelineEvent == null
              ? Collections.emptyList()
              : Collections.singletonList(timelineEvent));
    } catch (MaestroRetryableError mre) {
      LOG.info("Failed to execute foreach step runtime, will retry", mre);
      return new Result(
          State.CONTINUE,
          Collections.emptyMap(),
          Collections.singletonList(TimelineDetailsEvent.from(mre.getDetails())));
    } catch (Exception e) {
      LOG.error(
          "Failed to execute foreach workflow step runtime {}{}",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          e);
      return new Result(
          State.FATAL_ERROR,
          Collections.emptyMap(),
          Collections.singletonList(
              TimelineDetailsEvent.from(
                  Details.create(
                      e, false, "Failed to execute foreach workflow step runtime with an error"))));
    }
  }

  private State deriveStepStateOnceDone(ForeachArtifact artifact) {
    refreshIterationOverview(artifact);
    Checks.checkTrue(
        artifact.getForeachOverview().getRunningStatsCount(false) == 0,
        "Invalid running stats in foreach artifact overview: ",
        artifact.getForeachOverview());
    if (artifact.getForeachOverview().statusExistInIterations(WorkflowInstance.Status.FAILED)) {
      return State.FATAL_ERROR;
    } else if (artifact
        .getForeachOverview()
        .statusExistInIterations(WorkflowInstance.Status.TIMED_OUT)) {
      return State.TIMED_OUT;
    } else if (artifact
        .getForeachOverview()
        .statusExistInIterations(WorkflowInstance.Status.STOPPED)) {
      return State.STOPPED;
    } else {
      return State.DONE;
    }
  }

  private TimelineEvent runForeachIterations(
      WorkflowSummary workflowSummary,
      ForeachStep foreachStep,
      StepRuntimeSummary runtimeSummary,
      ForeachArtifact artifact,
      AtomicInteger nextIndex,
      int launchLimit) {
    int index = nextIndex.get();
    Optional<Details> details =
        launchForeachIterations(
            workflowSummary, foreachStep, runtimeSummary, artifact, nextIndex, launchLimit);
    if (details.isPresent()) {
      LOG.warn(
          "In step runtime {}{}, failed to start foreach workflow failed at iteration [{}], will try it again",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          nextIndex.get());
      return TimelineDetailsEvent.from(details.get());
    } else {
      LOG.info(
          "In step runtime {}{}, have created foreach workflow instances from iteration [{}] to [{}]",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          index + 1,
          nextIndex.get());
    }
    return null;
  }

  private Optional<TimelineEvent> savePendingActionIfFeasible(
      StepRuntimeSummary runtimeSummary, ForeachArtifact artifact) {
    if (runtimeSummary.getPendingAction() == null) {
      return Optional.empty();
    }

    StepAction action = runtimeSummary.getPendingAction();
    if (!action.isWorkflowAction()
        && action.getAction() == Actions.StepInstanceAction.RESTART // only support restart
        && action.getRestartConfig() != null) {
      List<RestartConfig.RestartNode> path = action.getRestartConfig().getRestartPath();
      if (path == null || path.size() <= 1) {
        LOG.info(
            "Get an invalid action path [{}] at foreach step [{}] and ignore it",
            path,
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      // pick up 2nd to the last restart node, which is the foreach iteration of foreach step
      RestartConfig.RestartNode restartNode = path.get(path.size() - 2);
      if (!Objects.equals(restartNode.getWorkflowId(), artifact.getForeachWorkflowId())) {
        LOG.warn(
            "Get an invalid action path node [{}] at foreach step [{}] and ignore it",
            restartNode,
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      // index is 0-based and iteration id is 1-based.
      if (artifact.getNextLoopIndex() < restartNode.getInstanceId()) {
        LOG.info(
            "Get an action while the current run has not yet looped over it at foreach step [{}] and ignore it",
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      if (artifact.getPendingAction() != null) {
        LOG.info(
            "Get an action while there is already an ongoing action at foreach step [{}] and ignore it",
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      if (!artifact
          .getForeachOverview()
          .isForeachIterationRestartable(restartNode.getInstanceId())) {
        LOG.info(
            "Get an action while iteration [{}]'s status is unknown or not failed at foreach step [{}] and ignore it",
            restartNode.getInstanceId(),
            artifact.getForeachIdentity());
        return Optional.empty();
      }
      try {
        WorkflowInstance instance =
            instanceDao.getLatestWorkflowInstanceRun(
                restartNode.getWorkflowId(), restartNode.getInstanceId());

        // support restarting terminated iterations not in succeeded status
        if (!instance.getStatus().isTerminal()) {
          LOG.info(
              "Get an action while iteration [{}]'s status is not-actionable [{}] at foreach step [{}] and ignore it",
              restartNode.getInstanceId(),
              instance.getStatus(),
              artifact.getForeachIdentity());
          return Optional.empty();
        }

        artifact.setPendingAction(
            ForeachAction.builder()
                .instanceId(restartNode.getInstanceId())
                .instanceRunId(instance.getWorkflowRunId())
                .restartConfig(action.getRestartConfig())
                .action(action.getAction())
                .user(action.getUser())
                .createTime(action.getCreateTime())
                .build());
        return Optional.of(
            TimelineLogEvent.info(
                "Saved a pending action for iteration [%s]", restartNode.getInstanceId()));
      } catch (Exception e) {
        LOG.warn(
            "Failed to save an action [{}] at foreach step [{}] and will retry it",
            action,
            artifact.getForeachIdentity());
        return Optional.empty();
      }
    } else {
      LOG.info(
          "Get an unsupported action [{}] at foreach step [{}] and ignore it",
          action,
          artifact.getForeachIdentity());
      return Optional.empty();
    }
  }

  /**
   * Note that restart will ignore the concurrency limit because when there is a failed iteration,
   * foreach step runtime will no longer launch new iterations. Based on the failure mode, it either
   * terminates all iterations or terminates after existing running iterations complete new
   * iteration will run. Therefore, the foreach ordering is preserved too. Take pending action will
   * be done only if the foreach iteration running event is sent plus the foreach artifact is saved
   * to DB. Otherwise, it will retry. So this method has to be idempotent and can run multiple
   * times.
   *
   * <p>Note that if users restart an iteration in other terminal (e.g. STOPPED or SUCCEEDED)
   * states, the concurrency limit and the foreach iteration order will be violated. We may need to
   * improve it to add the concurrency control.
   */
  private Optional<TimelineEvent> takePendingActionIfFeasible(
      WorkflowSummary workflowSummary,
      ForeachStep foreachStep,
      StepRuntimeSummary runtimeSummary,
      ForeachArtifact artifact) {
    if (artifact.getPendingAction() == null) {
      return Optional.empty();
    }

    ForeachAction foreachAction = artifact.getPendingAction();
    try {
      WorkflowInstance instance =
          instanceDao.getWorkflowInstanceRun(
              artifact.getForeachWorkflowId(),
              foreachAction.getInstanceId(),
              foreachAction.getInstanceRunId());
      if ((foreachAction.getInstanceRunId() == Constants.LATEST_ONE
              || instance.getWorkflowRunId() == foreachAction.getInstanceRunId())
          && instance.getStatus().isTerminal()) {
        int restartIterationId = (int) foreachAction.getInstanceId();
        ForeachArtifact restartArtifact = new ForeachArtifact();
        restartArtifact.setForeachWorkflowId(artifact.getForeachWorkflowId());
        restartArtifact.setForeachRunId(
            Math.max(artifact.getForeachRunId(), foreachAction.getInstanceRunId()) + 1);

        RunRequest runRequest =
            StepHelper.createInternalWorkflowRunRequest(
                    workflowSummary,
                    runtimeSummary,
                    Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
                    createForeachRunParams(
                        restartIterationId - 1, workflowSummary, foreachStep, runtimeSummary),
                    generateDedupKey(restartArtifact, restartIterationId - 1),
                    null)
                .toBuilder()
                .restartConfig(foreachAction.getRestartConfig())
                .build();

        WorkflowInstance.Status oldStatus = instance.getStatus();
        WorkflowRollupOverview oldOverview =
            instance.getRuntimeOverview() == null
                ? null
                : instance.getRuntimeOverview().getRollupOverview();

        Optional<Details> result =
            actionHandler.restartForeachInstance(
                runRequest, instance, foreachStep.getId(), restartArtifact.getForeachRunId());
        if (result.isPresent()) {
          LOG.info(
              "Failed to take the action for foreach step {}{} due to [{}] and will retry it later",
              workflowSummary.getIdentity(),
              runtimeSummary.getIdentity(),
              result.get());
          return result.map(TimelineDetailsEvent::from);
        } else {
          LOG.info(
              "Successfully took the pending action for foreach step {}{} and will update artifact together",
              workflowSummary.getIdentity(),
              runtimeSummary.getIdentity());
          artifact
              .getForeachOverview()
              .updateForRestart(
                  foreachAction.getInstanceId(), instance.getStatus(), oldStatus, oldOverview);
          artifact.setPendingAction(null);
          return Optional.of(
              TimelineLogEvent.info("Successfully take an action: [%s]", foreachAction));
        }
      } else {
        LOG.info(
            "Got an out-dated pending action [{}] for foreach step {}{} and ignore it...",
            foreachAction,
            workflowSummary.getIdentity(),
            runtimeSummary.getIdentity());
        artifact.setPendingAction(null);
        return Optional.empty();
      }
    } catch (Exception e) {
      LOG.warn(
          "Fatally failed to take the action for foreach step {}{} due to [{}] and will not retry it",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          e.getMessage());
      artifact.setPendingAction(null);
      return Optional.of(
          TimelineDetailsEvent.from(
              Details.create(
                  e,
                  false,
                  "Failed to take the action for foreach step with an error and won't retry")));
    }
  }

  /**
   * Launch foreach iterations with {@link ForeachStepRuntimeProperties#getLoopBatchLimit()} size
   * limit. It might break a large batch into small trunks to satisfy {@link
   * ForeachStepRuntimeProperties#getInsertBatchLimit()} size limit. For retryable errors, it
   * returns details. For fatal errors, it throws an exception.
   *
   * @param index it will carry the next loop index
   * @return error details.
   */
  @SuppressWarnings({"PMD.AvoidReassigningLoopVariables"})
  private Optional<Details> launchForeachIterations(
      WorkflowSummary workflowSummary,
      ForeachStep step,
      StepRuntimeSummary runtimeSummary,
      ForeachArtifact artifact,
      AtomicInteger index,
      final int launchLimit) {
    Workflow inlineWorkflow =
        createInlineWorkflow(artifact.getForeachWorkflowId(), artifact.getForeachIdentity(), step);
    List<RunRequest> runRequests = new ArrayList<>();
    List<Long> instanceIds = new ArrayList<>();

    Set<Long> skippedIterations =
        artifact
            .getForeachOverview()
            .getSkippedIterationsInRange(index.get() + 1, artifact.getAncestorIterationCount());

    boolean isDone = (index.get() >= artifact.getTotalLoopCount());
    for (int idx = index.get(), cnt = 0; !isDone; ++idx) {
      long instanceId = idx + 1;
      if (!skippedIterations.contains(instanceId)) {
        RunRequest runRequest =
            StepHelper.createInternalWorkflowRunRequest(
                workflowSummary,
                runtimeSummary,
                Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
                createForeachRunParams(idx, workflowSummary, step, runtimeSummary),
                generateDedupKey(artifact, idx),
                null);

        if (instanceStepConcurrencyHandler.addInstance(runRequest)) {
          runRequests.add(runRequest);
          instanceIds.add(instanceId);
          cnt++;
          isDone = (cnt == launchLimit);
        } else {
          idx = idx - 1; // exclude the current index as it is not launched
          isDone = true; // end the iteration
        }
      }
      isDone = isDone || (idx == artifact.getTotalLoopCount() - 1);

      if (runRequests.size() == properties.getInsertBatchLimit() || isDone) {
        // run (start or restart) foreach batch and pass run properties to inline foreach instances
        Optional<Details> details =
            actionHandler.runForeachBatch(
                inlineWorkflow,
                workflowSummary.getInternalId(), // inherit parent unique internalId
                workflowSummary.getWorkflowVersionId(), // inherit parent versionId
                workflowSummary.getRunProperties(), // inherit parent run properties
                step.getId(),
                artifact,
                runRequests,
                instanceIds);
        if (details.isPresent()) {
          return details;
        } else {
          index.set(idx + 1);
          runRequests.clear();
          instanceIds.clear();
        }
      }
    }
    return Optional.empty();
  }

  private String generateDedupKey(ForeachArtifact artifact, int index) {
    return String.format(
        "[%s][%s][%s]", artifact.getForeachWorkflowId(), artifact.getForeachRunId(), index);
  }

  /**
   * Get how many additional instances that foreach step can launch based on how many are currently
   * running and also the concurrency settings including both foreach step concurrency and also
   * workflow instance step concurrency limit.
   */
  private long getConcurrency(WorkflowSummary summary, ForeachStep step, long nonTerminalCount) {
    long concurrencyLimit = summary.deriveInstanceStepConcurrency();
    final Long concurrency = step.getConcurrency();
    if (concurrency != null) {
      concurrencyLimit = Math.min(concurrency, concurrencyLimit);
    }
    return Math.min(
        Math.max(0L, concurrencyLimit - nonTerminalCount), properties.getLoopBatchLimit());
  }

  private Map<Long, Long> refreshIterationOverview(ForeachArtifact artifact) {
    ForeachStepOverview stepOverview = artifact.getForeachOverview();

    List<ForeachIterationOverview> restartResults;
    long restartIterationCheckpoint = stepOverview.getFirstRestartIterationId();
    if (restartIterationCheckpoint > 0) {
      restartResults =
          instanceDao
              .getForeachIterationOverviewWithCheckpoint(
                  artifact.getForeachWorkflowId(),
                  artifact.getForeachRunId(),
                  restartIterationCheckpoint,
                  true)
              .stream()
              .filter(o -> stepOverview.getRestartInfo().contains(o.getInstanceId()))
              .collect(Collectors.toList());
      Checks.checkTrue(
          restartResults.size() == stepOverview.getRestartInfo().size(),
          "Invalid: inconsistent status for foreach restart [%s] where info size [%s] != [%s]",
          artifact.getForeachIdentity(),
          restartResults.size(),
          stepOverview.getRestartInfo());
    } else {
      restartResults = Collections.emptyList();
    }

    Set<Long> skipList = stepOverview.getSkippedIterationsWithCheckpoint();

    List<ForeachIterationOverview> results =
        instanceDao.getForeachIterationOverviewWithCheckpoint(
            artifact.getForeachWorkflowId(),
            artifact.getForeachRunId(),
            stepOverview.getCheckpoint(),
            false);

    if (!results.isEmpty()) {
      long maxIterationId = results.getFirst().getInstanceId(); // results are sorted in DESC;
      long newCheckpoint =
          results.stream()
              .filter(rs -> !rs.getStatus().isTerminal())
              .mapToLong(ForeachIterationOverview::getInstanceId)
              .min()
              .orElse(maxIterationId + 1);
      Checks.checkTrue(
          newCheckpoint >= stepOverview.getCheckpoint(),
          "In artifact [%s], updated checkpoint [%s] must be no less than the previous one [%s]",
          artifact,
          newCheckpoint,
          stepOverview.getCheckpoint());
      stepOverview.setCheckpoint(newCheckpoint);
    }

    stepOverview.resetRunning();
    for (ForeachIterationOverview result : restartResults) {
      stepOverview.addOne(result.getInstanceId(), result.getStatus(), result.getRollupOverview());
      if (result.getStatus().isTerminal()) {
        stepOverview.getRestartInfo().remove(result.getInstanceId());
      }
    }

    results.stream()
        .filter(o -> !skipList.contains(o.getInstanceId()))
        .forEach(
            result ->
                stepOverview.addOne(
                    result.getInstanceId(), result.getStatus(), result.getRollupOverview()));

    stepOverview.refreshDetail();

    return restartResults.stream()
        .collect(
            Collectors.toMap(
                ForeachIterationOverview::getInstanceId, ForeachIterationOverview::getRunId));
  }

  /**
   * Create an inline workflow definition for the foreach inline steps. It has empty params. All
   * params, including loop params and all workflow params and user defined step params (excluding
   * loop_params) inside foreach step, will be injected from run_params by workflow start request.
   */
  private static Workflow createInlineWorkflow(
      String foreachWorkflowId, String foreachIdentity, ForeachStep foreachStep) {
    return Workflow.builder()
        .id(foreachWorkflowId)
        .name(foreachIdentity)
        .description(
            "Maestro foreach inline workflow including steps within foreach created by "
                + foreachIdentity)
        .tags(foreachStep.getTags())
        // no need to set workflow timeout using foreachStep.getTimeout()
        .params(Collections.emptyMap()) // no param in inline workflow definition
        .steps(foreachStep.getSteps())
        .build(); // inline steps won't inherit foreach step failure mode
  }

  private Map<String, ParamDefinition> createForeachRunParams(
      final int index,
      WorkflowSummary workflowSummary,
      final Step step,
      final StepRuntimeSummary runtimeSummary) {
    Map<String, ParamDefinition> runParams = new LinkedHashMap<>();
    // all workflow params visible in the foreach will be passed to foreach inline workflow
    workflowSummary.getParams().forEach((k, v) -> runParams.put(k, v.toDefinition()));
    // all user defined params in the step definition will be passed to foreach inline workflow
    step.getParams()
        .forEach((k, v) -> runParams.put(k, runtimeSummary.getParams().get(k).toDefinition()));
    // remove foreach loop_params
    runParams.remove(LOOP_PARAMS_NAME);

    // inject loop_index and params inside loop_params.
    runParams.put(INDEX_PARAM_NAME, ParamDefinition.buildParamDefinition(INDEX_PARAM_NAME, index));
    MapParameter loopParams = runtimeSummary.getParams().get(LOOP_PARAMS_NAME).asMapParam();
    int idx = index;
    int length;
    for (String name : loopParams.getParamNames()) {
      Parameter param = loopParams.getEvaluatedParam(name);
      switch (param.getType()) {
        case STRING_ARRAY:
          length = param.asStringArray().length;
          runParams.put(
              name,
              ParamDefinition.buildParamDefinition(name, param.asStringArray()[idx % length]));
          break;
        case LONG_ARRAY:
          length = param.asLongArray().length;
          runParams.put(
              name, ParamDefinition.buildParamDefinition(name, param.asLongArray()[idx % length]));
          break;
        case DOUBLE_ARRAY:
          length = param.asDoubleArray().length;
          runParams.put(
              name,
              ParamDefinition.buildParamDefinition(name, param.asDoubleArray()[idx % length]));
          break;
        case BOOLEAN_ARRAY:
          length = param.asBooleanArray().length;
          runParams.put(
              name,
              ParamDefinition.buildParamDefinition(name, param.asBooleanArray()[idx % length]));
          break;
        default:
          throw new MaestroInternalError(
              "Found an invalid loop param with name [%s] and type [%s] for foreach step %s",
              name, param.getType(), runtimeSummary.getIdentity());
      }
      idx = idx / length;
    }
    return runParams;
  }

  /**
   * Terminate foreach step and all of foreach iterations. API calls to kill or stop are captured in
   * the maestro task and terminate in step runtime always return STOPPED. It only monitors if all
   * foreach iterations are stopped. The action related to termination has already been written to
   * DB. The final step instance termination callback will do the cleanup, including the action
   * deletion.
   */
  @Override
  public Result terminate(WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
    if (runtimeSummary.getArtifacts().containsKey(Artifact.Type.FOREACH.key())) {
      ForeachArtifact artifact =
          runtimeSummary.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();
      var restartRunIdMap = refreshIterationOverview(artifact);
      boolean done = artifact.getForeachOverview().getRunningStatsCount(false) == 0;
      if (!done) {
        tryTerminateQueuedInstancesIfNeeded(artifact);
        wakeUpUnderlyingActors(workflowSummary, artifact, restartRunIdMap);
        throw new MaestroRetryableError(
            "Termination at foreach step %s%s is not done and will retry it.",
            workflowSummary.getIdentity(), runtimeSummary.getIdentity());
      }
      return new Result(
          State.STOPPED,
          Collections.singletonMap(artifact.getType().key(), artifact),
          Collections.singletonList(
              TimelineLogEvent.info("Stopped all iterations because foreach step is terminated")));
    } else {
      LOG.debug(
          "foreach step {}{} haven't start any foreach iteration and do nothing.",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity());
      return Result.of(State.STOPPED);
    }
  }

  /** If the foreach inline instances are queued, then terminate them now. */
  private void tryTerminateQueuedInstancesIfNeeded(ForeachArtifact artifact) {
    if (artifact.getForeachOverview() != null
        && artifact.getForeachOverview().statusExistInIterations(WorkflowInstance.Status.CREATED)) {
      int terminated = Constants.TERMINATE_BATCH_LIMIT;
      int totalTerminated = 0;
      while (terminated == Constants.TERMINATE_BATCH_LIMIT) {
        terminated =
            instanceDao.terminateQueuedInstances(
                artifact.getForeachWorkflowId(),
                Constants.TERMINATE_BATCH_LIMIT,
                WorkflowInstance.Status.STOPPED,
                "The queued workflow instance is terminated by its upstream foreach step.");
        totalTerminated += terminated;
      }
      LOG.info(
          "Foreach step terminated [{}] queued foreach instances with foreach artifact{}.",
          totalTerminated,
          artifact);
    }
  }

  private void wakeUpUnderlyingActors(
      WorkflowSummary summary, ForeachArtifact artifact, Map<Long, Long> restartRunIdMap) {
    if (artifact.getForeachOverview().getDetails() != null) {
      var instanceRunIds =
          artifact.getForeachOverview().getDetails().flatten(e -> !e.isTerminal()).values().stream()
              .flatMap(Collection::stream)
              .collect(Collectors.toMap(e -> e, v -> artifact.getForeachRunId()));
      restartRunIdMap.forEach(
          (k, v) -> {
            if (instanceRunIds.containsKey(k)) {
              instanceRunIds.put(k, v);
            }
          });
      var msg =
          MessageDto.createMessageForWakeUp(
              artifact.getForeachWorkflowId(), summary.getGroupInfo(), instanceRunIds);
      queueSystem.notify(msg);
    }
  }
}
